package com.atguigu.chapter05.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.Arrays;
import java.util.List;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/7/17 10:44
 */
public class Flink02_Sink_ES_Unbouded {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        
        List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop162", 9200), new HttpHost("hadoop163", 9200), new HttpHost("hadoop164", 9200));
        ElasticsearchSink.Builder<WaterSensor> esBuilder = new ElasticsearchSink.Builder<>(
            hosts,
            new ElasticsearchSinkFunction<WaterSensor>() {
                @Override
                public void process(WaterSensor element,
                                    RuntimeContext ctx,
                                    RequestIndexer indexer) {
                    String json = JSON.toJSONString(element);
                    
                    IndexRequest index = Requests
                        .indexRequest()
                        .index("sensor")
                        .type("_doc")
                        .id(element.getId())
                        .source(json, XContentType.JSON);
                    
                    indexer.add(index);
                }
            }
        );
        esBuilder.setBulkFlushInterval(1000);
        esBuilder.setBulkFlushMaxActions(1);
        
        env
            .socketTextStream("hadoop162", 9999)
            .map(line -> {
                String[] data = line.split(",");
                return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
            })
            .keyBy(WaterSensor::getId)
            .max("vc")
            .addSink(esBuilder.build());
        
        env.execute();
        
    }
}
